package com.lwt.kettle.common.config.kettle;

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IORuntimeException;
import cn.hutool.core.io.resource.ResourceUtil;
import com.lwt.kettle.common.utils.CommUtils;
import com.lwt.kettle.common.utils.SpringContextUtils;
import org.apache.commons.collections.MapUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransExecutionConfiguration;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.www.SlaveServerJobStatus;
import org.pentaho.di.www.SlaveServerTransStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.util.Map;

public class KettleServer {
    private static final Logger logger = LoggerFactory.getLogger(KettleServer.class);


    static {
        logger.info(" __           __    __  .__          \n" +
                "|  | __ _____/  |__/  |_|  |   ____  \n" +
                "|  |/ // __ \\   __\\   __\\  | _/ __ \\ \n" +
                "|    <\\  ___/|  |  |  | |  |_\\  ___/ \n" +
                "|__|_ \\\\___  >__|  |__| |____/\\___  >\n" +
                "     \\/    \\/                     \\/ ");
        try {
            KettleEnvironment.init();
            EnvUtil.environmentInit();
        } catch (KettleException e) {
            e.printStackTrace();
        }
        KettleConfig kettleConfig = (KettleConfig) SpringContextUtils.getBean(KettleConfig.class);
        if (null == kettleConfig || null == kettleConfig.getServers() || kettleConfig.getServers().size() == 0) {
            throw new RuntimeException("kettle config have a err");
        }
    }


    /**
     * 执行kettle作业
     *
     * @param jobName 作业路径
     * @param pMap    作业的参数
     * @return Result
     * @throws Exception
     */
    public Result executeJob(String jobName, Map<String, String> pMap) throws Exception {
        JobMeta jobMeta = new JobMeta(getResource(KettleServerFactory.getJobPath() + jobName), null, null);
        JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
        jobConfig.setRemoteServer(KettleServerFactory.createSlaveServer());

        //参数传递
        if (MapUtils.isNotEmpty(pMap)) {
            jobConfig.setVariables(pMap);
            for (String paramName : jobConfig.getParams().keySet()) {
                String paramValue = jobConfig.getParams().get(paramName);
                jobMeta.setParameterValue(paramName, paramValue);
            }
        }
        //远程调用Kettle服务
        String lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobConfig, null, null);
        //等待Kettle服务运行结束
        SlaveServerJobStatus jobStatus = null;
        do {
            Thread.sleep(1000);
            jobStatus = jobConfig.getRemoteServer().getJobStatus(jobMeta.getName(), lastCarteObjectId, 0);
        } while (!CommUtils.isNull(jobStatus) && jobStatus.isRunning());
        return jobStatus.getResult();
    }


    /**
     * 执行kettle作业
     *
     * @param planName 作业路径
     * @param pMap     作业的参数
     * @return Result
     * @throws Exception
     */
    public Result executePlan(String planName, Map<String, String> pMap) throws Exception {
        TransMeta tranMeta = new TransMeta(getResource(KettleServerFactory.getPlanPath() + planName), null, Boolean.TRUE, null, null);
        TransExecutionConfiguration tranConfig = new TransExecutionConfiguration();
        tranConfig.setRemoteServer(KettleServerFactory.createSlaveServer());
        //参数传递
        if (MapUtils.isNotEmpty(pMap)) {
            tranConfig.setVariables(pMap);
            for (String paramName : pMap.keySet()) {
                String paramValue = pMap.get(paramName);
                tranMeta.setParameterValue(paramName, paramValue);
            }
        }
        //远程调用Kettle服务
        String lastCarteObjectId = Trans.sendToSlaveServer(tranMeta, tranConfig, null, null);
        //等待Kettle服务运行结束
        SlaveServerTransStatus jobStatus = null;
        do {
            Thread.sleep(1000);
            jobStatus = tranConfig.getRemoteServer().getTransStatus(tranMeta.getName(), lastCarteObjectId, 0);
        } while (!CommUtils.isNull(jobStatus) && jobStatus.isRunning());
        return jobStatus.getResult();
    }


    /**
     * 是否成功
     *
     * @param result
     * @return
     */
    public Boolean isSuccess(Result result) {
        if (CommUtils.isNull(result) || result.getNrErrors() > 0) {
            return false;
        }
        return true;
    }


    /**
     * 获取etl文件
     *
     * @param path 文件路径
     * @return
     */
    private InputStream getResource(String path) {
        InputStream stream = ResourceUtil.getStreamSafe(path);
        if (CommUtils.isNull(stream)) {
            //资源文件目录读取失败异常，尝试本地文件获取
            try {
                stream = FileUtil.getInputStream(path);
            } catch (IORuntimeException e) {
                throw new IORuntimeException("Kettle File Not Found： " + path);
            }
        }
        return stream;
    }


}
